package com.antball.v1;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

/**
 * @Auther: huangsj
 * @Date: 2018/8/23 11:23
 * @Description:
 */
public class ConsumerThread  implements Runnable {

    public void run() {
        System.out.println("线程ID：" + Thread.currentThread().getId() + "线程开始时间：" + System.currentTimeMillis());
        /**
         * 两个泛型参数
         * 第一个泛型参数：指的就是kafka中一条记录key的类型
         * 第二个泛型参数：指的就是kafka中一条记录value的类型
         */
        Properties properties = new Properties();
        try {
            properties.load(ConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
        }
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

        Collection<String> topics = Arrays.asList("hadoop");
        //消费者订阅topic
        consumer.subscribe(topics);
        ConsumerRecords<String, String> consumerRecords = null;
        while (true) {
            //接下来就要从topic中拉取数据
            consumerRecords = consumer.poll(1000);
            //遍历每一条记录
            for (ConsumerRecord consumerRecord : consumerRecords) {
                long offset = consumerRecord.offset();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                int partition = consumerRecord.partition();
                System.out.println("CurrentThreadID: " + Thread.currentThread().getId() + "\toffset: " + offset + "\tpartition: " + partition + "\tkey: " + key + "\tvalue: " + value);
            }
        }
    }
}